import lombok.val;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.DataStreamUtils;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

import java.util.Iterator;


public class WordCount
{



    static class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>
    {

        private transient ValueState<Tuple2<Long, Long>> sum; // a tuple containing the count and the sum

        @Override
        public void open(Configuration config)
        {
            ValueStateDescriptor<Tuple2<Long, Long>> descriptor =new ValueStateDescriptor<>
                            ("average", // the state name
                            TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {})); // type information
            descriptor.setQueryable("query123");

            sum = getRuntimeContext().getState(descriptor);
        }



        @Override
        public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception
        {
            Tuple2<Long, Long> currentSum = sum.value();
            if (currentSum == null)
            {
                currentSum = new Tuple2<>(0L, 1L);
            }

            currentSum.f0 += 1;
            currentSum.f1 += 1;
            sum.update(currentSum);


//            这了规定了终端要输入三条数据，才能激活输出
            if (currentSum.f0 >= 2)
            {
                out.collect(new Tuple2<>(input.f0, currentSum.f1));
            }
        }
    }



//    －－－－－－－－－－－－－－－－－－－－－－－下面是顶层函数－－－－－－－－－－－－－－－－－－－－－－－－－－－－－－－－－－－

    public static void main(String[] args) throws Exception
    {



        String host = "Desktop";
        Integer port = 8085;
        String jarFiles = "/home/appleyuchi/桌面/Flink_Code/flink_state/target/datastream_api-1.0-SNAPSHOT.jar";

        StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(host, port, jarFiles);
                
//        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.enableCheckpointing(5000);
        //5s启动一次checkpoint
        env.setStateBackend(new RocksDBStateBackend("hdfs://Desktop:9000/user/hdfs/flink/checkpoints"));
        // checkpoint保存地址

        env.socketTextStream("Desktop", 9999)
                .flatMap(new FlatMapFunction<String, Tuple2<Long, Long>>() {
                    //   这里的s是输入，collector是在driver client节点输出
                    @Override
                    public void flatMap(String s, Collector<Tuple2<Long, Long>> collector) throws Exception {
                        collector.collect(new Tuple2<>(Long.parseLong(s), 1L));
                    }
                })
                .keyBy(0)
                .flatMap(new CountWindowAverage())
                .keyBy(0)
                .print();//对输入的数据进行输出


//        Iterator<Tuple2<Long, Long>> myOutput = DataStreamUtils.collect(myResult);
//
//
//System.out.println(myOutput.next());


        env.execute(WordCount.class.getCanonicalName());//获取类名
    }




}


//https://stackoverflow.com/questions/50022950/apache-flink-transform-datastream-source-to-a-list
//尝试在driver端输出